Boto3(AWS SDK for Python)でSQSに送信したメッセージをLambdaでポーリングして受信してみた
こんにちは、みかみです。
さくら缶だと中身もおいしく感じる不思議..(近頃スーパードライの消費率上がってます。。
はじめに
やりたいこと
- boto3 で SQS を操作してみたい
- スケジュールイベントで実行される Lambda関数 をつくってみたい
- SQS に保持されたメッセージを Lambda でポーリングして受信したい
確認環境(local)
- Python(3.6.0)インストール済み
- AWS CLIインストール&設定済み(AWS CLI のインストールと設定)
- boto3 を pip install 済み
SQSの準備
キューの作成&メッセージ送信
Boto3(AWS SDK for Python)を使って、SQSにキューを作成&メッセージを送信します。
import boto3 name = 'test-load-mikami' sqs = boto3.resource('sqs') try: # キューの名前を指定してインスタンスを取得 queue = sqs.get_queue_by_name(QueueName=name) except: # 指定したキューがない場合はexceptionが返るので、キューを作成 queue = sqs.create_queue(QueueName=name) # メッセージ×3をキューに送信 msg_num = 3 msg_list = [{'Id' : '{}'.format(i+1), 'MessageBody' : 'msg_{}'.format(i+1)} for i in range(msg_num)] response = queue.send_messages(Entries=msg_list) print(response)
コンソールから実行します。
C:\Users\mikami.yuki\work\test_sqs>py send_msg.py {'Successful': [{'Id': '1', 'MessageId': '92011f4d-472c-40e5-b166-ef55738e0380', 'MD5OfMessageBody': '9980c8a9248139f14f4165e5d53088aa'}, {'Id': '2', 'MessageId': '8231a409-8a98-461f-bcfb-66ccb6f75cad', 'MD5OfMessageBody': '0fd4e86b2daa009cd9929641dbd7dab6'}, {'Id': '3', 'MessageId': '3ee20fce-5b98-491d-83f9-c7466ab1d018', 'MD5OfMessageBody': '8b30166735242c192258d4974f662a5f'}], 'ResponseMetadata': {'RequestId': 'b44ab343-85a0-5ebe-96a5-e27bed9e8c1d', 'HTTPStatusCode': 200, 'HTTPHeaders': {'server': 'Server', 'date': 'Tue, 07 Mar 2017 07:49:18 GMT', 'content-type': 'text/xml', 'content-length': '861', 'connection': 'keep-alive', 'x-amzn-requestid': 'b44ab343-85a0-5ebe-96a5-e27bed9e8c1d'}, 'RetryAttempts': 0}}
AWS管理コンソールから確認してみると、ちゃんとメッセージがエンキューされました。
メッセージ受信確認
送信したメッセージがちゃんと取れるか確認してみます。
import boto3 # キューの名前を指定して name = 'test-load-mikami' sqs = boto3.resource('sqs') queue = sqs.get_queue_by_name(QueueName=name) while True: # メッセージを取得 msg_list = queue.receive_messages(MaxNumberOfMessages=10) if msg_list: for message in msg_list: print(message.body) message.delete() else: # メッセージがなくなったらbreak break
コンソールから実行してみると、ちゃんとメッセージのbodyがとれています。
C:\Users\mikami.yuki\work\test_sqs>py get_msg.py msg_1 msg_2 msg_3
AWS管理コンソールから確認してみると、すべてのメッセージがデキューされました。
boto3 receive_messages() の取得メッセージ数
SQS.Queue.receive_messages() のリファレンスに以下の記載がありました。
- MaxNumberOfMessages で一度に受信するメッセージの最大数を指定可能
- MaxNumberOfMessages には、1~10が指定できる
- MaxNumberOfMessages を指定しなかった場合のデフォルト値は1
ふむふむ。
MaxNumberOfMessages=10 で receive_message() すれば、キューの中の3つのメッセージ全部取れるはず↓
for message in queue.receive_messages(MaxNumberOfMessages=10): print(message.body) message.delete()
と思ったのですが、なぜか、全部取れない。。(1個しか取れなかったリ、2個しか取れなかったり。。
よくよく読んでみると、、
MaxNumberOfMessages (integer) -- The maximum number of messages to return. Amazon SQS never returns more messages than this value (however, fewer messages might be returned). Valid values are 1 to 10. Default is 1.
「however, fewer messages might be returned」(「ただし、返されるメッセージは少なくなります」by Google 翻訳)
なるほど、です。。(指定した数取れるとは限らないのですね。。
Lambdaの準備
Lambda関数の作成
AWS管理コンソールから、スケジュールイベントでSQSメッセージを受信するLambda関数を作成します。
「Lambda関数の作成」ボタンから、Python 2.7 の lambda-canaryを選択。
コードは先ほどメッセージの受信確認に使用したものを、インライン編集で入力しました。
保存してテストしてみます。
START RequestId: 10ecf83a-0303-11e7-8cf7-05b21b9da4a5 Version: $LATEST An error occurred (AWS.SimpleQueueService.NonExistentQueue) when calling the GetQueueUrl operation: The specified queue does not exist or you do not have access to it.: QueueDoesNotExist Traceback (most recent call last): File "/var/task/lambda_function.py", line 14, in lambda_handler queue = sqs.get_queue_by_name(QueueName=name) File "/var/runtime/boto3/resources/factory.py", line 520, in do_action response = action(self, *args, **kwargs) File "/var/runtime/boto3/resources/action.py", line 83, in __call__ response = getattr(parent.meta.client, operation_name)(**params) File "/var/runtime/botocore/client.py", line 253, in _api_call return self._make_api_call(operation_name, kwargs) File "/var/runtime/botocore/client.py", line 543, in _make_api_call raise error_class(parsed_response, operation_name) QueueDoesNotExist: An error occurred (AWS.SimpleQueueService.NonExistentQueue) when calling the GetQueueUrl operation: The specified queue does not exist or you do not have access to it. END RequestId: 10ecf83a-0303-11e7-8cf7-05b21b9da4a5 REPORT RequestId: 10ecf83a-0303-11e7-8cf7-05b21b9da4a5 Duration: 213.61 ms Billed Duration: 300 ms Memory Size: 128 MB Max Memory Used: 25 MB
エラーだそうで。。
「キューがない」とか言われてるのですが、メッセージ取得できることはローカル環境から確認済みなので、ロールを見直してみます。
ロールの追加
Lambda関数作成時、「Lambda 関数ハンドラおよびロール」の項目では、
- 「ロール」プルダウンで「テンプレートから新しいロールを作成」
- 「ポリシーテンプレート」プルダウンで「SQSポーリングのアクセス権限」
を選択しました。(これだけではダメらしい。。
AWS管理コンソール「IAM」の「ロール」から、先ほど作成したロール名を選択し、「ポリシーのアタッチ」で「AmazonSQSFullAccess」を追加します。
再度Lambdaをテスト。
OKです!
スケジュールイベントで実行したLambdaからSQSメッセージを取得
Lambda関数作成時、「トリガーの設定」ページ「スケジュール式」項目で、ポーリング時間を指定しました。
テスト実行で問題なかったので、スケジュールイベントトリガを有効化します。
待つこと1分・・・
→CloudWatchのログを確認してみます。
ちゃんとメッセージ受信できました!
※CloudWatchのロググループ&ログストリームは、Lambda関数のパラメータ context からも参照できます。
まとめ(所感)
SQS も Lambda も、案外手軽にお試しできました。
これぞフルマネージドなAWSの利点ですねv
(でも最近、apache Kafka や RabbitMQ + Celery もちょっと気になる。。